累计统计
准备数据
access.csv1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19A,2015-01,5
A,2015-01,15
A,2015-01,5
A,2015-01,8
A,2015-02,4
A,2015-02,6
A,2015-03,16
A,2015-03,22
A,2015-04,10
A,2015-04,50
B,2015-01,5
B,2015-01,25
B,2015-02,10
B,2015-02,5
B,2015-03,23
B,2015-03,10
B,2015-03,1
B,2015-04,10
B,2015-04,50
准备环境
1 | object AccumulatorCount { |
具体实现逻辑
DataFrame API 方式
方式一:1
2
3// rowsBetween(Long.MinValue, 0):窗口的大小是按照排序从最小值到当前行
val accuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(Long.MinValue, 0)
usersDF.withColumn("acc_amount", sum(usersDF("amount")).over(accuCntSpec)).show()
方式二1
2
3
4
5
6usersDF.select(
$"name",
$"mounth",
$"amount",
sum($"amount").over(accuCntSpec).as("acc_amount")
).show()
sql方式
思路:根据DF算子意思,找到SqlBase.g4文件,看看是否有该类sql支持。
在SqlBase.g4文件中刚好找到如下内容1
2
3
4
5
6
7
8
9
10
11
12windowFrame
: frameType=RANGE start=frameBound
| frameType=ROWS start=frameBound
| frameType=RANGE BETWEEN start=frameBound AND end=frameBound
| frameType=ROWS BETWEEN start=frameBound AND end=frameBound
;
frameBound
: UNBOUNDED boundType=(PRECEDING | FOLLOWING)
| boundType=CURRENT ROW
| expression boundType=(PRECEDING | FOLLOWING)
;
在spark源码sql模块core项目org.apache.spark.sql.execution包中找到SQLWindowFunctionSuite类找到如下测试方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35test("window function: multiple window expressions in a single expression") {
val nums = sparkContext.parallelize(1 to 10).map(x => (x, x % 2)).toDF("x", "y")
nums.createOrReplaceTempView("nums")
val expected =
Row(1, 1, 1, 55, 1, 57) ::
Row(0, 2, 3, 55, 2, 60) ::
Row(1, 3, 6, 55, 4, 65) ::
Row(0, 4, 10, 55, 6, 71) ::
Row(1, 5, 15, 55, 9, 79) ::
Row(0, 6, 21, 55, 12, 88) ::
Row(1, 7, 28, 55, 16, 99) ::
Row(0, 8, 36, 55, 20, 111) ::
Row(1, 9, 45, 55, 25, 125) ::
Row(0, 10, 55, 55, 30, 140) :: Nil
val actual = sql(
"""
|SELECT
| y,
| x,
| sum(x) OVER w1 AS running_sum,
| sum(x) OVER w2 AS total_sum,
| sum(x) OVER w3 AS running_sum_per_y,
| ((sum(x) OVER w1) + (sum(x) OVER w2) + (sum(x) OVER w3)) as combined2
|FROM nums
|WINDOW w1 AS (ORDER BY x ROWS BETWEEN UnBOUNDED PRECEDiNG AND CuRRENT RoW),
| w2 AS (ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOuNDED FoLLOWING),
| w3 AS (PARTITION BY y ORDER BY x ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
""".stripMargin)
checkAnswer(actual, expected)
spark.catalog.dropTempView("nums")
}
下面就可以开心的照着案例写sql去了,真嗨皮!!!!1
2
3
4
5
6
7
8
9
10usersDF.createOrReplaceTempView("access")
spark.sql(
"""
|select name,
| mounth,
| amount,
| sum(amount) over (partition by name order by mounth asc rows between unbounded preceding and current row ) as acc_amount
|from access
|
""".stripMargin).show()
累加N天之前,假设N=3
DataFrame API方式
1 | val preThreeAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(-3, 0) |
sql方式
1 | spark.sql( |
累加前3天,后3天
API方式
1 | val preThreeFiveAccuCntSpec = Window.partitionBy("name").orderBy("mounth").rowsBetween(3, 3) |
sql方式
1 | spark.sql( |
基本窗口函数案例
准备环境
1 | object WindowFunctionTest extends BaseSparkSession { |
平均移动值
DataFrame API方式实现
1 | // 窗口定义从 -1(前一行)到 1(后一行) ,每一个滑动的窗口总用有3行 |
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
前一行数据
DataFrame API方式实现
1 | val lagwSpec = Window.partitionBy("site").orderBy("date") |
sql方式实现
1 | df.createOrReplaceTempView("site_info") |
排名
DataFrame API方式实现
1 | val rankwSpec = Window.partitionBy("site").orderBy("date") |
sql方式
1 | spark.sql( |
分组topn和分组取最小
1 | import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} |
优雅方式定义scheme
1 | def getScheme(): StructType = { |
保存小数点后n位
10表示总的位数,2表示保留几位小数,10要>=实际的位数,否则为NULL1
spark.sql("select cast(sale_amount as decimal(10, 2))from ycd").show()
重命名行
1 | import org.apache.spark.sql.SparkSession |
转置
1 | import org.apache.spark.sql.DataFrame |